读书是改变命运的最好办法

02 开发一个多 Agent 协同客服系统

开发一个多 Agent 协同客服系统

场景设定:

用户提问:“我的订单为什么还没发货?”

系统拆解任务:

Agent A:查询订单状态(调用内部系统)

Agent B:检查物流信息(调用第三方 API)

Agent C:汇总结果并生成回复

技术难点:

如何设计 Agent 之间的通信协议?

如何保证失败重试机制?

如何可视化 Agent 之间的交互过程

好的,很高兴能帮助您开发一个多 Agent 协同客服系统。这是一个非常实用的项目,可以大大提升客户服务的效率和体验。

了解需求

您希望构建一个多 Agent 协同客服系统,用于处理用户提出的客服问题。您提供了一个具体场景:“我的订单为什么还没发货?”,并拆解了相应的 Agent 职责:

  • Agent A: 查询订单状态(调用内部系统)
  • Agent B: 检查物流信息(调用第三方 API)
  • Agent C: 汇总结果并生成回复

您也提出了三个关键的技术难点:

  1. 如何设计 Agent 之间的通信协议?
  2. 如何保证失败重试机制?
  3. 如何可视化 Agent 之间的交互过程?

我将针对这些问题提供详细的解决方案和代码示例。

概略介绍解决方案

我们将采用一个中心化的协调者(Orchestrator)模式来管理 Agent 之间的通信和任务流。每个 Agent 都是一个独立的模块,负责执行特定任务,并通过协调者进行消息传递。

系统工作原理概述:

  1. 用户提出问题,例如:“我的订单为什么还没发货?”
  2. 协调者接收到用户问题,并根据预设规则(或通过自然语言处理)识别出需要激活的 Agent。
  3. 协调者将任务分配给 Agent A(查询订单状态)。
  4. Agent A 执行查询,并将结果返回给协调者。
  5. 协调者判断是否需要 Agent B 介入,如果需要,将任务分配给 Agent B(检查物流信息),并可能将 Agent A 的部分结果作为输入传递给 Agent B。
  6. Agent B 执行查询,并将结果返回给协调者。
  7. 协调者在收到所有必要信息后,将所有相关结果传递给 Agent C(汇总结果并生成回复)。
  8. Agent C 根据收到的信息生成最终的用户回复,并将其返回给协调者。
  9. 协调者将最终回复呈现给用户。

开发步骤:

我们将分以下几个步骤来实现:

  1. 定义 Agent 接口和基类: 确保所有 Agent 都遵循统一的结构。
  2. 实现协调者(Orchestrator): 负责任务分发、结果汇总和通信管理。
  3. 实现 Agent A (查询订单状态): 模拟内部系统调用。
  4. 实现 Agent B (检查物流信息): 模拟第三方 API 调用。
  5. 实现 Agent C (汇总结果并生成回复): 模拟智能回复生成。
  6. 设计 Agent 之间的通信协议: 使用消息队列或事件总线。
  7. 实现失败重试机制: 在协调者或 Agent 内部处理。
  8. 可视化 Agent 交互过程: 使用日志和简单的状态图。

假设和限制:

  • 我们将使用 Python 作为开发语言。
  • 内部系统和第三方 API 调用将用模拟数据代替,以简化示例。
  • 自然语言处理部分将简化,主要侧重于 Agent 之间的协同。
  • 可视化将是基于文本的日志输出,而不是复杂的图形界面。

展示代码和实现说明

我们将从基础结构开始,逐步构建系统。

1. 定义 Agent 接口和基类

首先,我们定义一个抽象基类 Agent,所有具体的 Agent 都将继承它。这将确保它们都有一个 execute 方法,用于执行其核心逻辑。

import abc

class Agent(abc.ABC):
    """
    Agent 的抽象基类。
    所有具体的 Agent 都应继承此基类并实现 execute 方法。
    """
    def __init__(self, name: str):
        self.name = name

    @abc.abstractmethod
    def execute(self, task_data: dict) -> dict:
        """
        执行 Agent 的核心逻辑。

        Args:
            task_data (dict): 包含 Agent 执行任务所需输入数据的字典。

        Returns:
            dict: 包含 Agent 执行结果的字典。
        """
        pass

    def __str__(self):
        return f"Agent({self.name})"

设计思路:

  • abc.ABC 用于定义抽象基类,强制子类实现 execute 方法。
  • __init__ 方法用于初始化 Agent 的名称。
  • execute 方法是 Agent 的核心,它接收一个 task_data 字典作为输入,并返回一个包含执行结果的字典。这种字典结构允许灵活地传递各种类型的数据。

2. 实现协调者(Orchestrator)

协调者是系统的核心,负责管理 Agent 的生命周期、任务分配、结果收集和通信。

Python

import time
import uuid
from typing import Dict, Type, Any, Callable

class Orchestrator:
    """
    负责协调不同 Agent 之间任务流的调度器。
    它管理 Agent 注册、任务分发、结果收集和错误处理。
    """
    def __init__(self):
        self.agents: Dict[str, Agent] = {}
        self.task_queue: list = [] # 简单队列,实际生产环境可用消息队列
        self.results: Dict[str, Any] = {}
        self.event_log: list = [] # 用于记录交互过程

    def register_agent(self, agent: Agent):
        """
        注册一个 Agent 到协调器。
        """
        if agent.name in self.agents:
            print(f"警告: Agent '{agent.name}' 已注册,将被覆盖。")
        self.agents[agent.name] = agent
        self._log_event(f"Agent '{agent.name}' 已注册。")

    def _log_event(self, event_description: str, task_id: str = None, agent_name: str = None, data: Any = None):
        """
        记录 Agent 交互事件,用于可视化和调试。
        """
        timestamp = time.time()
        log_entry = {
            "timestamp": timestamp,
            "description": event_description,
            "task_id": task_id,
            "agent_name": agent_name,
            "data": data
        }
        self.event_log.append(log_entry)
        print(f"[日志] {time.strftime('%H:%M:%S', time.localtime(timestamp))} - {event_description}")

    def process_user_query(self, query: str) -> str:
        """
        处理用户查询,启动 Agent 协同流程。
        """
        task_id = str(uuid.uuid4())
        self.results[task_id] = {}
        self._log_event(f"收到用户查询: '{query}'", task_id=task_id)

        # 示例:根据查询内容初步判断需要激活哪些 Agent
        # 实际应用中,这里会集成 NLP 或更复杂的规则引擎
        if "订单" in query or "发货" in query:
            self._log_event(f"识别到订单/发货相关查询,启动 Agent A 和 Agent B", task_id=task_id)
            self.task_queue.append({"agent": "AgentA", "task_id": task_id, "input": {"query": query}})
            self.task_queue.append({"agent": "AgentB", "task_id": task_id, "input": {"query": query}})
        else:
            self._log_event(f"无法识别查询类型,直接交给 Agent C 回复", task_id=task_id)
            self.task_queue.append({"agent": "AgentC", "task_id": task_id, "input": {"query": query, "status": "无法识别", "logistics": "无信息"}})


        # 执行任务队列中的 Agent
        while self.task_queue:
            task_item = self.task_queue.pop(0) # FIFO
            agent_name = task_item["agent"]
            current_task_id = task_item["task_id"]
            task_input = task_item["input"]

            if agent_name not in self.agents:
                self._log_event(f"错误: 未找到 Agent '{agent_name}'", task_id=current_task_id)
                continue

            agent = self.agents[agent_name]
            self._log_event(f"正在执行 Agent '{agent.name}' (任务ID: {current_task_id})", task_id=current_task_id, agent_name=agent.name, data=task_input)

            try:
                # 尝试执行 Agent,并实现重试机制
                retries = 3
                for attempt in range(retries):
                    try:
                        agent_result = agent.execute(task_input)
                        self._log_event(f"Agent '{agent.name}' 执行成功,结果: {agent_result}", task_id=current_task_id, agent_name=agent.name, data=agent_result)
                        self.results[current_task_id].update({agent.name: agent_result}) # 汇总结果
                        break # 成功则跳出重试循环
                    except Exception as e:
                        self._log_event(f"Agent '{agent.name}' 执行失败 (尝试 {attempt+1}/{retries}): {e}", task_id=current_task_id, agent_name=agent.name, data={"error": str(e)})
                        if attempt < retries - 1:
                            time.sleep(1) # 重试前等待
                        else:
                            self._log_event(f"Agent '{agent.name}' 最终执行失败,任务ID: {current_task_id}", task_id=current_task_id, agent_name=agent.name, data={"error": "最终失败"})
                            self.results[current_task_id].update({agent.name: {"error": "执行失败", "details": str(e)}})
                            # 如果某个关键 Agent 失败,可能需要中断或退回,这里简化为继续
            except Exception as e:
                self._log_event(f"Orchestrator 处理 Agent '{agent.name}' 时发生意外错误: {e}", task_id=current_task_id, agent_name=agent.name, data={"error": str(e)})
                self.results[current_task_id].update({agent.name: {"error": "协调器处理错误", "details": str(e)}})


        # 所有前置 Agent 执行完毕后,将结果传递给 Agent C 生成回复
        final_result_for_agent_c = self.results.get(task_id, {})
        self._log_event(f"将所有 Agent 结果汇总给 Agent C: {final_result_for_agent_c}", task_id=task_id, agent_name="AgentC")

        # 确保 AgentC 被执行
        if "AgentC" not in self.agents:
             self._log_event(f"错误: 未注册 AgentC,无法生成回复", task_id=task_id)
             return "系统错误:无法生成回复。"

        try:
            retries = 3
            final_response = "系统错误:无法生成回复。"
            for attempt in range(retries):
                try:
                    agent_c_result = self.agents["AgentC"].execute(final_result_for_agent_c)
                    final_response = agent_c_result.get("response", "未能生成有效回复。")
                    self._log_event(f"Agent C 生成最终回复: {final_response}", task_id=task_id, agent_name="AgentC", data=agent_c_result)
                    break
                except Exception as e:
                    self._log_event(f"Agent C 执行失败 (尝试 {attempt+1}/{retries}): {e}", task_id=task_id, agent_name="AgentC", data={"error": str(e)})
                    if attempt < retries - 1:
                        time.sleep(1)
            return final_response
        except Exception as e:
            self._log_event(f"Orchestrator 处理 Agent C 时发生意外错误: {e}", task_id=task_id, agent_name="AgentC", data={"error": str(e)})
            return f"系统在生成回复时遇到错误: {e}"

    def get_event_log(self) -> list:
        """
        获取所有记录的事件日志。
        """
        return self.event_log

设计思路:

  • __init__: 初始化注册的 Agent 字典、任务队列、结果存储和事件日志。

  • register_agent: 用于将具体的 Agent 实例添加到协调器中。

  • _log_event: 这是一个关键方法,用于记录系统内部的所有重要事件,包括 Agent 的启动、完成、失败等。它将用于后续的可视化。

  • process_user_query:

这是整个系统的入口点。

  • 它为每个用户查询生成一个唯一的 task_id,以便跟踪。
  • 任务拆解与分发: 示例中通过简单的关键字匹配 ("订单" 或 "发货") 来决定激活哪些 Agent。在实际应用中,这里会集成更复杂的 NLP 模型(如意图识别、实体抽取)来智能地分配任务。
  • 任务队列 (self.task_queue): 这是一个简单的列表,用于存储待执行的任务。在生产环境中,这通常会替换为异步消息队列(如 RabbitMQ, Kafka),以支持解耦、削峰和异步处理。
  • 结果汇总 (self.results): 用于存储不同 Agent 执行的结果。
  • 失败重试机制: 在 while self.task_queue: 循环中,每次执行 Agent 时,都尝试 retries 次。如果 Agent 抛出异常,会等待一段时间后重试。
  • Agent 之间的通信协议: 这里的通信协议是隐式的,通过协调者传递 task_data 字典来实现。每个 Agent 接收输入字典,并返回输出字典。协调者负责将一个 Agent 的输出作为另一个 Agent 的输入。

  • get_event_log: 提供访问事件日志的接口。

3. 实现 Agent A (查询订单状态)

模拟内部系统查询,例如数据库查询。

Python

import random

class AgentA(Agent):
    """
    模拟查询订单状态的 Agent。
    """
    def __init__(self):
        super().__init__("AgentA")

    def execute(self, task_data: dict) -> dict:
        order_id = task_data.get("order_id") # 假设从用户查询中提取到订单ID
        if not order_id:
            # 模拟从用户查询中尝试识别订单号
            query = task_data.get("query", "")
            if "订单号" in query: # 简单模拟提取
                parts = query.split("订单号")
                if len(parts) > 1:
                    order_id = parts[1].strip().split(" ")[0] # 提取第一个单词作为订单号
            if not order_id:
                # 随机生成一个模拟订单号用于演示
                order_id = f"ORDER_{random.randint(1000, 9999)}"

        print(f"AgentA: 正在查询订单 {order_id} 的状态...")
        time.sleep(random.uniform(0.5, 1.5)) # 模拟查询耗时

        # 模拟内部系统查询结果
        statuses = ["已付款待发货", "已发货", "已完成", "已取消"]
        random_status = random.choice(statuses)

        # 模拟随机失败
        if random.random() < 0.1: # 10% 的概率失败
            raise Exception("内部系统查询失败:数据库连接超时。")

        return {"order_id": order_id, "status": random_status}

设计思路:

  • 继承 Agent 基类。
  • execute 方法接收 task_data,尝试从其中提取 order_id。如果用户查询中没有明确提供,则模拟生成一个。
  • time.sleep 模拟网络延迟或数据库查询时间。
  • random.choice 模拟不同的订单状态。
  • 失败模拟: if random.random() < 0.1: 模拟了 Agent 在执行过程中可能遇到的失败,这将触发协调者的重试机制。

4. 实现 Agent B (检查物流信息)

模拟调用第三方物流 API。

Python

class AgentB(Agent):
    """
    模拟检查物流信息的 Agent。
    """
    def __init__(self):
        super().__init__("AgentB")

    def execute(self, task_data: dict) -> dict:
        order_id = task_data.get("order_id")
        if not order_id:
            # 如果 Agent A 没有提供,尝试从用户查询中提取
            query = task_data.get("query", "")
            if "订单号" in query:
                parts = query.split("订单号")
                if len(parts) > 1:
                    order_id = parts[1].strip().split(" ")[0]
            if not order_id:
                # 随机生成一个模拟订单号用于演示
                order_id = f"ORDER_{random.randint(1000, 9999)}"

        print(f"AgentB: 正在检查订单 {order_id} 的物流信息...")
        time.sleep(random.uniform(0.7, 2.0)) # 模拟 API 调用耗时

        # 模拟第三方物流 API 结果
        logistics_info = {
            "tracking_number": f"TRK{random.randint(100000, 999999)}",
            "last_update": f"2025-06-{random.randint(10, 16)} 14:30",
            "location": random.choice(["分拣中心", "运输中", "派送中", "已签收"]),
            "estimated_delivery": f"2025-06-{random.randint(17, 20)}"
        }

        # 模拟随机失败
        if random.random() < 0.15: # 15% 的概率失败
            raise Exception("第三方物流 API 调用失败:服务不可用。")

        return {"order_id": order_id, "logistics": logistics_info}

设计思路:

  • 与 Agent A 类似,从 task_data 中获取 order_id。
  • time.sleep 模拟 API 调用延迟。
  • logistics_info 模拟第三方 API 返回的数据结构。
  • 失败模拟: 模拟第三方 API 调用的失败情况。

5. 实现 Agent C (汇总结果并生成回复)

这个 Agent 负责整合所有信息,并生成一个友好的回复。

Python

class AgentC(Agent):
    """
    汇总所有 Agent 结果并生成回复的 Agent。
    """
    def __init__(self):
        super().__init__("AgentC")

    def execute(self, task_data: dict) -> dict:
        print(f"AgentC: 正在汇总结果并生成回复...")
        time.sleep(0.3) # 模拟处理时间

        agent_a_result = task_data.get("AgentA", {})
        agent_b_result = task_data.get("AgentB", {})

        order_id = agent_a_result.get("order_id", agent_b_result.get("order_id", "未知订单"))
        status = agent_a_result.get("status", "未知状态")
        logistics = agent_b_result.get("logistics", {})

        response_parts = []
        response_parts.append(f"关于您的订单 {order_id},")

        if "error" in agent_a_result:
            response_parts.append(f"查询订单状态时遇到问题:{agent_a_result['error']}。")
        else:
            response_parts.append(f"当前状态显示为:'{status}'。")

        if "error" in agent_b_result:
            response_parts.append(f"查询物流信息时遇到问题:{agent_b_result['error']}。")
        else:
            if logistics:
                response_parts.append(f"最新物流更新:您的包裹目前在'{logistics.get('location', '未知位置')}',追踪号为'{logistics.get('tracking_number', '无')}'。")
                response_parts.append(f"预计送达日期:{logistics.get('estimated_delivery', '未知')}。")
            else:
                response_parts.append(f"暂无详细物流信息。")

        response_parts.append("感谢您的耐心等待!")

        final_response = " ".join(response_parts)

        # 模拟随机失败
        if random.random() < 0.05: # 5% 的概率失败
            raise Exception("回复生成服务故障。")

        return {"response": final_response}

设计思路:

  • 从 task_data 中获取 AgentA 和 AgentB 的结果。
  • 根据获取到的信息(包括可能的错误信息),动态生成用户友好的回复。
  • 这个 Agent 是整个流程的收尾,将所有碎片信息整合。
  • 失败模拟: 模拟回复生成过程中的失败。

6. 设计 Agent 之间的通信协议

如前所述,我们采用了基于协调者的隐式通信协议

  • 数据结构: 所有 Agent 之间的通信都通过 dict 数据类型进行。协调者将上一个 Agent 的输出(一个 dict)作为下一个 Agent 的输入(task_data 字典)的一部分。
  • 消息传递: 协调者的 task_queue 充当一个简化的消息队列。当一个 Agent 完成任务后,它的结果会被协调者存储,并在需要时传递给下一个 Agent。
  • 统一接口: Agent 抽象基类强制所有 Agent 实现 execute(task_data: dict) -> dict 接口,确保了通信的标准化。

优点:

  • 简单易实现: 对于小型系统,这种方式足够。
  • 集中控制: 协调者对整个流程有完全的控制权,便于调试和监控。
  • 灵活数据传递: 字典结构允许传递任意复杂的数据。

缺点(对于大规模分布式系统):

  • 单点故障: 协调者是核心,如果它崩溃,整个系统将停摆。
  • 可伸缩性: 协调者可能成为瓶颈,尤其是在处理大量并发请求时。
  • 解耦性: Agent 之间虽然不直接通信,但它们依然依赖于协调者的存在。

改进方案(对于生产环境):

  • 消息队列(如 RabbitMQ, Kafka): 每个 Agent 订阅自己感兴趣的消息,并将结果发布到另一个主题。协调者只负责初始任务分发和最终结果聚合。这将大大提高系统的解耦性、可伸缩性和容错性。
  • API Gateway/微服务: 每个 Agent 作为一个独立的微服务,通过 RESTful API 或 gRPC 进行通信。协调者调用这些服务并编排流程。
  • 共享数据库/KV 存储: Agent 可以将结果写入共享存储,其他 Agent 从中读取所需数据。

7. 失败重试机制

我们已经在 Orchestrator 的 process_user_query 方法中实现了简单的失败重试机制。

Python

            try:
                # 尝试执行 Agent,并实现重试机制
                retries = 3 # 定义重试次数
                for attempt in range(retries):
                    try:
                        agent_result = agent.execute(task_input)
                        # ... 成功处理 ...
                        break # 成功则跳出重试循环
                    except Exception as e:
                        self._log_event(f"Agent '{agent.name}' 执行失败 (尝试 {attempt+1}/{retries}): {e}", task_id=current_task_id, agent_name=agent.name, data={"error": str(e)})
                        if attempt < retries - 1:
                            time.sleep(1) # 重试前等待1秒
                        else:
                            self._log_event(f"Agent '{agent.name}' 最终执行失败,任务ID: {current_task_id}", task_id=current_task_id, agent_name=agent.name, data={"error": "最终失败"})
                            self.results[current_task_id].update({agent.name: {"error": "执行失败", "details": str(e)}})
            except Exception as e:
                # ... 处理协调器内部的错误 ...

实现说明:

  • 重试循环: 在 Orchestrator 中,每个 Agent 的 execute 调用都被包裹在一个 for 循环中,允许进行多次尝试 (retries 次)。
  • 异常捕获: 使用 try...except 块来捕获 Agent 执行时可能抛出的任何异常。
  • 间隔重试: time.sleep(1) 在每次重试前增加短暂的延迟,避免立即重试导致的服务压力。
  • 日志记录: 每次尝试和最终失败都会被记录到事件日志中,便于跟踪问题。
  • 最终失败处理: 如果所有重试都失败,会记录最终的失败状态,并可能在结果中标记错误,以便 Agent C 进行处理。

更高级的重试策略:

  • 指数退避: 每次重试的等待时间逐渐增加(例如,1s, 2s, 4s, 8s),以避免对失败的服务造成过多压力。
  • 熔断器模式: 当某个 Agent 连续失败达到一定阈值时,暂时停止对其的调用,避免浪费资源和进一步恶化问题。一段时间后才尝试恢复调用。
  • 死信队列: 对于无法处理的消息或任务,将其发送到死信队列,以便后续分析和手动干预。

8. 可视化 Agent 之间的交互过程

我们通过 Orchestrator 中的 _log_event 方法和 event_log 列表来实现简单的可视化。

Python

class Orchestrator:
    # ...
    def _log_event(self, event_description: str, task_id: str = None, agent_name: str = None, data: Any = None):
        """
        记录 Agent 交互事件,用于可视化和调试。
        """
        timestamp = time.time()
        log_entry = {
            "timestamp": timestamp,
            "description": event_description,
            "task_id": task_id,
            "agent_name": agent_name,
            "data": data
        }
        self.event_log.append(log_entry)
        print(f"[日志] {time.strftime('%H:%M:%S', time.localtime(timestamp))} - {event_description}")
    # ...

实现说明:

  • 每次关键操作(如接收查询、Agent 启动、Agent 完成、Agent 失败)都会调用 _log_event。
  • 日志条目包含时间戳、描述、任务 ID、Agent 名称和相关数据,提供详细的上下文信息。
  • print 语句将日志实时输出到控制台。
  • event_log 列表收集所有日志条目,允许在流程结束后进行分析。

简单的可视化示例(运行代码后查看控制台输出):

当您运行系统时,控制台将输出类似以下内容的日志:

[日志] 10:41:40 - 收到用户查询: '我的订单为什么还没发货?' (任务ID: ...)
[日志] 10:41:40 - 识别到订单/发货相关查询,启动 Agent A 和 Agent B (任务ID: ...)
[日志] 10:41:40 - 正在执行 Agent 'AgentA' (任务ID: ...)
AgentA: 正在查询订单 ORDER_xxxx 的状态...
[日志] 10:41:41 - Agent 'AgentA' 执行成功,结果: {'order_id': 'ORDER_xxxx', 'status': '已付款待发货'} (任务ID: ...)
[日志] 10:41:41 - 正在执行 Agent 'AgentB' (任务ID: ...)
AgentB: 正在检查订单 ORDER_xxxx 的物流信息...
[日志] 10:41:42 - Agent 'AgentB' 执行成功,结果: {'order_id': 'ORDER_xxxx', 'logistics': {...}} (任务ID: ...)
[日志] 10:41:42 - 将所有 Agent 结果汇总给 Agent C: {...} (任务ID: ...)
[日志] 10:41:42 - 正在执行 Agent 'AgentC' (任务ID: ...)
AgentC: 正在汇总结果并生成回复...
[日志] 10:41:42 - Agent C 生成最终回复: 关于您的订单 ORDER_xxxx,当前状态显示为:'已付款待发货'。最新物流更新:... 感谢您的耐心等待! (任务ID: ...)

如果发生错误,您会看到类似:

[日志] 10:41:45 - Agent 'AgentA' 执行失败 (尝试 1/3): 内部系统查询失败:数据库连接超时。 (任务ID: ...)
[日志] 10:41:46 - 正在执行 Agent 'AgentA' (任务ID: ...)
AgentA: 正在查询订单 ORDER_xxxx 的状态...
[日志] 10:41:47 - Agent 'AgentA' 执行成功,结果: {...} (任务ID: ...)

更高级的可视化:

  • 图表工具: 使用 Mermaid、Graphviz 等工具生成流程图或状态图,根据 event_log 动态绘制 Agent 间的交互流程。
  • Web Dashboard: 构建一个简单的 Web 界面,实时显示 Agent 状态、任务队列、错误日志和交互流程的可视化。
  • 分布式追踪系统: 集成 OpenTracing 或 OpenTelemetry,将 Agent 之间的调用链路进行追踪和可视化(例如使用 Jaeger)。

完整代码示例

将所有组件组合在一起:

Python

import abc
import time
import uuid
import random
from typing import Dict, Type, Any, Callable

# 1. 定义 Agent 接口和基类
class Agent(abc.ABC):
    """
    Agent 的抽象基类。
    所有具体的 Agent 都应继承此基类并实现 execute 方法。
    """
    def __init__(self, name: str):
        self.name = name

    @abc.abstractmethod
    def execute(self, task_data: dict) -> dict:
        """
        执行 Agent 的核心逻辑。

        Args:
            task_data (dict): 包含 Agent 执行任务所需输入数据的字典。

        Returns:
            dict: 包含 Agent 执行结果的字典。
        """
        pass

    def __str__(self):
        return f"Agent({self.name})"

# 2. 实现协调者(Orchestrator)
class Orchestrator:
    """
    负责协调不同 Agent 之间任务流的调度器。
    它管理 Agent 注册、任务分发、结果收集和错误处理。
    """
    def __init__(self):
        self.agents: Dict[str, Agent] = {}
        self.task_queue: list = [] # 简单队列,实际生产环境可用消息队列
        self.results: Dict[str, Any] = {}
        self.event_log: list = [] # 用于记录交互过程

    def register_agent(self, agent: Agent):
        """
        注册一个 Agent 到协调器。
        """
        if agent.name in self.agents:
            print(f"警告: Agent '{agent.name}' 已注册,将被覆盖。")
        self.agents[agent.name] = agent
        self._log_event(f"Agent '{agent.name}' 已注册。")

    def _log_event(self, event_description: str, task_id: str = None, agent_name: str = None, data: Any = None):
        """
        记录 Agent 交互事件,用于可视化和调试。
        """
        timestamp = time.time()
        log_entry = {
            "timestamp": timestamp,
            "description": event_description,
            "task_id": task_id,
            "agent_name": agent_name,
            "data": data
        }
        self.event_log.append(log_entry)
        print(f"[日志] {time.strftime('%H:%M:%S', time.localtime(timestamp))} - {event_description}")

    def process_user_query(self, query: str) -> str:
        """
        处理用户查询,启动 Agent 协同流程。
        """
        task_id = str(uuid.uuid4())
        self.results[task_id] = {}
        self._log_event(f"收到用户查询: '{query}'", task_id=task_id)

        # 示例:根据查询内容初步判断需要激活哪些 Agent
        # 实际应用中,这里会集成 NLP 或更复杂的规则引擎
        if "订单" in query or "发货" in query:
            self._log_event(f"识别到订单/发货相关查询,启动 Agent A 和 Agent B", task_id=task_id)
            # 添加 Agent A 和 Agent B 的任务
            self.task_queue.append({"agent": "AgentA", "task_id": task_id, "input": {"query": query}})
            self.task_queue.append({"agent": "AgentB", "task_id": task_id, "input": {"query": query}})
        else:
            self._log_event(f"无法识别查询类型,直接交给 Agent C 回复", task_id=task_id)
            # 对于无法识别的查询,直接给 Agent C 一个通用输入
            self.task_queue.append({"agent": "AgentC", "task_id": task_id, "input": {"query": query, "AgentA": {"status": "未知"}, "AgentB": {"logistics": {}}}})


        # 执行任务队列中的 Agent
        # 这里需要注意,Agent B 可能需要 Agent A 的输出,所以需要确保 Agent A 先执行
        # 在这个简单的队列中,我们通过先添加 Agent A 再添加 Agent B 来保证顺序
        # 更复杂的依赖管理需要有向无环图 (DAG) 或专门的工作流引擎
        processed_agents = set() # 记录已处理的 Agent

        while self.task_queue:
            # 找到一个可以执行的任务 (其依赖的 Agent 已经处理完毕)
            current_task_item = None
            for i, task_item in enumerate(self.task_queue):
                agent_name = task_item["agent"]
                # 简化依赖:Agent B 依赖 Agent A,Agent C 依赖 Agent A 和 Agent B
                if agent_name == "AgentA":
                    current_task_item = self.task_queue.pop(i)
                    break
                elif agent_name == "AgentB":
                    if "AgentA" in processed_agents:
                        # 传递 AgentA 的结果给 AgentB
                        task_item["input"].update(self.results[task_item["task_id"]].get("AgentA", {}))
                        current_task_item = self.task_queue.pop(i)
                        break
                elif agent_name == "AgentC":
                    # AgentC 依赖所有前置 Agent,所以等队列为空或只剩AgentC时再执行
                    if len(self.task_queue) == 1 and self.task_queue[0]["agent"] == "AgentC":
                         current_task_item = self.task_queue.pop(i)
                         break

            if not current_task_item:
                # 如果没有可执行的任务,且队列不为空,说明存在循环依赖或逻辑问题
                # 对于本例,表示前置Agent还没执行完
                if self.task_queue and len(processed_agents) < len(self.agents) -1: # -1 因为 AgentC是最后执行
                    time.sleep(0.1) # 等待一下,看是否有Agent完成
                    continue # 继续循环查找可执行任务
                else:
                    break # 队列为空或者只剩下 AgentC 但其依赖还未完全满足,跳出


            agent_name = current_task_item["agent"]
            current_task_id = current_task_item["task_id"]
            task_input = current_task_item["input"]

            if agent_name not in self.agents:
                self._log_event(f"错误: 未找到 Agent '{agent_name}'", task_id=current_task_id)
                continue

            agent = self.agents[agent_name]
            self._log_event(f"正在执行 Agent '{agent.name}' (任务ID: {current_task_id})", task_id=current_task_id, agent_name=agent.name, data=task_input)

            try:
                # 尝试执行 Agent,并实现重试机制
                retries = 3
                for attempt in range(retries):
                    try:
                        agent_result = agent.execute(task_input)
                        self._log_event(f"Agent '{agent.name}' 执行成功,结果: {agent_result}", task_id=current_task_id, agent_name=agent.name, data=agent_result)
                        self.results[current_task_id].update({agent.name: agent_result}) # 汇总结果
                        processed_agents.add(agent.name) # 标记为已处理
                        break # 成功则跳出重试循环
                    except Exception as e:
                        self._log_event(f"Agent '{agent.name}' 执行失败 (尝试 {attempt+1}/{retries}): {e}", task_id=current_task_id, agent_name=agent.name, data={"error": str(e)})
                        if attempt < retries - 1:
                            time.sleep(1) # 重试前等待
                        else:
                            self._log_event(f"Agent '{agent.name}' 最终执行失败,任务ID: {current_task_id}", task_id=current_task_id, agent_name=agent.name, data={"error": "最终失败"})
                            self.results[current_task_id].update({agent.name: {"error": "执行失败", "details": str(e)}})
                            # 即使失败,也标记为已处理,否则可能卡住
                            processed_agents.add(agent.name)

            except Exception as e:
                self._log_event(f"Orchestrator 处理 Agent '{agent.name}' 时发生意外错误: {e}", task_id=current_task_id, agent_name=agent.name, data={"error": str(e)})
                self.results[current_task_id].update({agent.name: {"error": "协调器处理错误", "details": str(e)}})
                processed_agents.add(agent.name) # 标记为已处理

        # 所有前置 Agent 执行完毕后,将结果传递给 Agent C 生成回复
        final_result_for_agent_c = self.results.get(task_id, {})
        self._log_event(f"将所有 Agent 结果汇总给 Agent C: {final_result_for_agent_c}", task_id=task_id, agent_name="AgentC")

        # 确保 AgentC 被执行
        if "AgentC" not in self.agents:
             self._log_event(f"错误: 未注册 AgentC,无法生成回复", task_id=task_id)
             return "系统错误:无法生成回复。"

        try:
            retries = 3
            final_response = "系统错误:无法生成回复。"
            for attempt in range(retries):
                try:
                    agent_c_result = self.agents["AgentC"].execute(final_result_for_agent_c)
                    final_response = agent_c_result.get("response", "未能生成有效回复。")
                    self._log_event(f"Agent C 生成最终回复: {final_response}", task_id=task_id, agent_name="AgentC", data=agent_c_result)
                    break
                except Exception as e:
                    self._log_event(f"Agent C 执行失败 (尝试 {attempt+1}/{retries}): {e}", task_id=task_id, agent_name="AgentC", data={"error": str(e)})
                    if attempt < retries - 1:
                        time.sleep(1)
            return final_response
        except Exception as e:
            self._log_event(f"Orchestrator 处理 Agent C 时发生意外错误: {e}", task_id=task_id, agent_name="AgentC", data={"error": str(e)})
            return f"系统在生成回复时遇到错误: {e}"

    def get_event_log(self) -> list:
        """
        获取所有记录的事件日志。
        """
        return self.event_log

# 3. 实现 Agent A (查询订单状态)
class AgentA(Agent):
    """
    模拟查询订单状态的 Agent。
    """
    def __init__(self):
        super().__init__("AgentA")

    def execute(self, task_data: dict) -> dict:
        order_id = task_data.get("order_id") 
        if not order_id:
            query = task_data.get("query", "")
            # 模拟从用户查询中尝试识别订单号
            import re
            match = re.search(r'(订单号|订单|order ID|ID)[:\s]*(\w+)', query, re.IGNORECASE)
            if match:
                order_id = match.group(2)
            else:
                order_id = f"ORDER_{random.randint(1000, 9999)}" # 随机生成一个模拟订单号用于演示

        print(f"AgentA: 正在查询订单 {order_id} 的状态...")
        time.sleep(random.uniform(0.5, 1.5)) # 模拟查询耗时

        statuses = ["已付款待发货", "已发货", "已完成", "已取消"]
        random_status = random.choice(statuses)

        # 模拟随机失败
        if random.random() < 0.2: # 增加失败概率来演示重试
            raise Exception("内部系统查询失败:数据库连接超时。")

        return {"order_id": order_id, "status": random_status}

# 4. 实现 Agent B (检查物流信息)
class AgentB(Agent):
    """
    模拟检查物流信息的 Agent。
    """
    def __init__(self):
        super().__init__("AgentB")

    def execute(self, task_data: dict) -> dict:
        order_id = task_data.get("order_id")
        if not order_id:
            # 如果 Agent A 没有提供,尝试从用户查询中提取
            query = task_data.get("query", "")
            import re
            match = re.search(r'(订单号|订单|order ID|ID)[:\s]*(\w+)', query, re.IGNORECASE)
            if match:
                order_id = match.group(2)
            else:
                order_id = f"ORDER_{random.randint(1000, 9999)}"

        print(f"AgentB: 正在检查订单 {order_id} 的物流信息...")
        time.sleep(random.uniform(0.7, 2.0)) # 模拟 API 调用耗时

        logistics_info = {
            "tracking_number": f"TRK{random.randint(100000, 999999)}",
            "last_update": f"2025-06-{random.randint(10, 16)} 14:30",
            "location": random.choice(["分拣中心", "运输中", "派送中", "已签收"]),
            "estimated_delivery": f"2025-06-{random.randint(17, 20)}"
        }

        # 模拟随机失败
        if random.random() < 0.25: # 增加失败概率来演示重试
            raise Exception("第三方物流 API 调用失败:服务不可用。")

        return {"order_id": order_id, "logistics": logistics_info}

# 5. 实现 Agent C (汇总结果并生成回复)
class AgentC(Agent):
    """
    汇总所有 Agent 结果并生成回复的 Agent。
    """
    def __init__(self):
        super().__init__("AgentC")

    def execute(self, task_data: dict) -> dict:
        print(f"AgentC: 正在汇总结果并生成回复...")
        time.sleep(0.3) # 模拟处理时间

        agent_a_result = task_data.get("AgentA", {})
        agent_b_result = task_data.get("AgentB", {})

        # 优先从 AgentA 获取订单ID,如果A失败则从B获取,最后是未知
        order_id = agent_a_result.get("order_id") or agent_b_result.get("order_id") or "未知订单"

        response_parts = []
        response_parts.append(f"关于您的订单 {order_id},")

        # 处理 Agent A 的结果
        if "error" in agent_a_result:
            response_parts.append(f"查询订单状态时遇到问题:{agent_a_result['error']}。")
        else:
            status = agent_a_result.get("status", "未知状态")
            response_parts.append(f"当前状态显示为:'{status}'。")

        # 处理 Agent B 的结果
        if "error" in agent_b_result:
            response_parts.append(f"查询物流信息时遇到问题:{agent_b_result['error']}。")
        else:
            logistics = agent_b_result.get("logistics", {})
            if logistics:
                response_parts.append(f"最新物流更新:您的包裹目前在'{logistics.get('location', '未知位置')}',追踪号为'{logistics.get('tracking_number', '无')}'。")
                response_parts.append(f"预计送达日期:{logistics.get('estimated_delivery', '未知')}。")
            else:
                response_parts.append(f"暂无详细物流信息。")

        response_parts.append("感谢您的耐心等待!")

        final_response = " ".join(response_parts)

        # 模拟随机失败
        if random.random() < 0.05: 
            raise Exception("回复生成服务故障。")

        return {"response": final_response}

# 主程序入口
if __name__ == "__main__":
    orchestrator = Orchestrator()

    # 注册所有 Agent
    orchestrator.register_agent(AgentA())
    orchestrator.register_agent(AgentB())
    orchestrator.register_agent(AgentC())

    print("\n--- 场景一:正常流程 ---")
    user_query_1 = "我的订单号是 123456789,为什么还没发货?"
    response_1 = orchestrator.process_user_query(user_query_1)
    print(f"\n用户提问: '{user_query_1}'")
    print(f"系统回复: '{response_1}'")
    print("\n-----------------------------------\n")

    # 重置日志和结果
    orchestrator.event_log = []
    orchestrator.results = {}
    orchestrator.task_queue = []

    print("\n--- 场景二:模拟 Agent A 和 Agent B 失败重试 ---")
    user_query_2 = "我的订单为什么还没发货?" # 不带订单号,看Agent如何处理
    response_2 = orchestrator.process_user_query(user_query_2)
    print(f"\n用户提问: '{user_query_2}'")
    print(f"系统回复: '{response_2}'")
    print("\n-----------------------------------\n")

    # 重置日志和结果
    orchestrator.event_log = []
    orchestrator.results = {}
    orchestrator.task_queue = []

    print("\n--- 场景三:无法识别的查询 ---")
    user_query_3 = "你好,请问你们的工作时间?"
    response_3 = orchestrator.process_user_query(user_query_3)
    print(f"\n用户提问: '{user_query_3}'")
    print(f"系统回复: '{response_3}'")
    print("\n-----------------------------------\n")

    # 可以选择打印完整的事件日志进行分析
    # print("\n--- 完整的事件日志 ---")
    # for entry in orchestrator.get_event_log():
    #     print(entry)

实现说明:

  • 执行顺序的简化处理: 在 Orchestrator 的 process_user_query 中,为了简化,我通过在 self.task_queue 中添加 Agent 的顺序来暗示其执行顺序(Agent A -> Agent B -> Agent C)。
  • 更完善的依赖管理: 在实际生产环境中,为了处理复杂的 Agent 依赖关系(例如,Agent B 只有在 Agent A 成功后才能启动),您需要一个更强大的工作流引擎,或者使用有向无环图(DAG)来定义任务依赖,例如 Apache Airflow、Prefect 等。
  • 订单号提取: 在 Agent A 和 Agent B 中,我增加了简单的正则表达式来从用户查询中提取订单号,以使模拟更真实。

通过运行这个完整的代码,您将看到一个多 Agent 协同客服系统的基本框架,包括 Agent 间的通信、失败重试和简单的可视化日志。您可以根据需要扩展每个 Agent 的功能,并改进协调者的逻辑。